This notebook will only work in a Jupyter session running on mathmadslinux2p.
You can start your own Jupyter session on mathmadslinux2p and open this notebook in Chrome on the MADS Windows server by
Steps
ssh mathmadslinux2p.start_pyspark_notebook or /opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999)))).start_spark() to start a spark session in the notebook.stop_spark() before closing the notebook or kill your spark application by hand using the link in the Spark UI.# Run this cell to import pyspark and to define start_spark() and stop_spark()
import findspark
findspark.init()
import getpass
import pandas
import pyspark
import random
import re
from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession
# Functions used below
def username():
"""Get username with any domain information removed.
"""
return re.sub('@.*', '', getpass.getuser())
def dict_to_html(d):
"""Convert a Python dictionary into a two column table for display.
"""
html = []
html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
for k, v in d.items():
html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
html.append(f'</table>')
return ''.join(html)
def show_as_html(df, n=20):
"""Leverage existing pandas jupyter integration to show a spark dataframe as html.
Args:
n (int): number of rows to show (default: 20)
"""
display(df.limit(n).toPandas())
def display_spark():
"""Display the status of the active Spark session if one is currently running.
"""
if 'spark' in globals() and 'sc' in globals():
name = sc.getConf().get("spark.app.name")
html = [
f'<p><b>Spark</b></p>',
f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
f'<ul>',
f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
f'</ul>',
f'<p><b>Config</b></p>',
dict_to_html(dict(sc.getConf().getAll())),
f'<p><b>Notes</b></p>',
f'<ul>',
f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
f'</ul>',
]
display(HTML(''.join(html)))
else:
html = [
f'<p><b>Spark</b></p>',
f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
f'<ul>',
f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
f'</ul>',
]
display(HTML(''.join(html)))
# Functions to start and stop spark
def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
"""Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
Args:
executor_instances (int): number of executors (default: 2)
executor_cores (int): number of cores per executor (default: 1)
worker_memory (float): worker memory (default: 1)
master_memory (float): master memory (default: 1)
"""
global spark
global sc
user = username()
cores = executor_instances * executor_cores
partitions = cores * 4
port = 4000 + random.randint(1, 999)
spark = (
SparkSession.builder
.master("spark://masternode2:7077")
.config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
.config("spark.dynamicAllocation.enabled", "false")
.config("spark.executor.instances", str(executor_instances))
.config("spark.executor.cores", str(executor_cores))
.config("spark.cores.max", str(cores))
.config("spark.executor.memory", f"{worker_memory}g")
.config("spark.driver.memory", f"{master_memory}g")
.config("spark.driver.maxResultSize", "0")
.config("spark.sql.shuffle.partitions", str(partitions))
.config("spark.ui.port", str(port))
.appName(user + " (jupyter)")
.getOrCreate()
)
sc = SparkContext.getOrCreate()
display_spark()
def stop_spark():
"""Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
"""
global spark
global sc
if 'spark' in globals() and 'sc' in globals():
spark.stop()
del spark
del sc
display_spark()
# Make css changes to improve spark output readability
html = [
'<style>',
'pre { white-space: pre !important; }',
'table.dataframe td { white-space: nowrap !important; }',
'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
'</style>',
]
display(HTML(''.join(html)))
The code below provides a template for how you would use a notebook to start spark, run some code, and then stop spark.
Steps
start_spark() to start a spark session in the notebook (only change the default resources when advised to do so for an exercise or assignment)stop_spark() before closing the notebook or kill your spark application by hand using the link in the Spark UI.# Run this cell to start a spark session in this notebook
start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)
import sys
sys.path.append(r'C:\Users\sjb131\AppData\Roaming\Python\Python311\site-packages')
from iso3166 import countries as iso_country_codes
from joshuaproject.countrycodes import CountryCodes as fips_country_codes
from pyspark.sql import functions as F
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import os
stations = spark.read.parquet("hdfs:///user/sjb131/outputs/ghcnd/ghcnd-stations.parquet")
show_as_html(stations, 10)
countries = spark.read.parquet("hdfs:///user/sjb131/outputs/ghcnd/ghcnd-countries.parquet")
show_as_html(countries, 10)
schema_rainfall = StructType([
StructField("year", StringType(), True),
StructField("code", StringType(), True),
StructField("total_rainfall", DoubleType(), True),
StructField("station_count", IntegerType(), True),
StructField("data_point_count", IntegerType(), True),
StructField("expected_data_points", IntegerType(), True),
StructField("average_rainfall", DoubleType(), True),
StructField("scaling_factor", DoubleType(), True),
StructField("scaled_avg_rainfall", DoubleType(), True),
])
rainfall = spark.read.csv("hdfs:///user/sjb131/outputs/ghcnd/rainfall.csv", schema=schema_rainfall)
show_as_html(rainfall, 10)
schema_daily = StructType([
StructField("id", StringType(), True),
StructField("date", StringType(), True),
StructField("element", StringType(), True),
StructField("value", DoubleType(), True),
StructField("measurement_flag", StringType(), True),
StructField("quality_flag", StringType(), True),
StructField("source_flag", StringType(), True),
StructField("observation_time", StringType(), True),
StructField("station_name", StringType(), True),
])
nz_daily = spark.read.csv("hdfs:///user/sjb131/outputs/ghcnd/nz_daily.csv", schema=schema_daily)
nz_daily = nz_daily.withColumn('date', F.to_date('date', 'yyyyMMdd'))
show_as_html(nz_daily, 10)
fips_codes = fips_country_codes()
fips_to_iso_mapping = {row.FIPS: row.ISO for row in fips_codes}
broadcast_fips_to_iso_mapping = spark.sparkContext.broadcast(fips_to_iso_mapping)
def convert_fips_to_iso(fips_code):
return broadcast_fips_to_iso_mapping.value.get(fips_code)
# Register the function as a UDF
convert_fips_to_iso_udf = F.udf(convert_fips_to_iso)
alpha2_to_alpha3_mapping = {country.alpha2: country.alpha3 for country in iso_country_codes}
broadcast_alpha2_to_alpha3_mapping = spark.sparkContext.broadcast(alpha2_to_alpha3_mapping)
def convert_alpha2_to_alpha3(alpha2_code):
return broadcast_alpha2_to_alpha3_mapping.value.get(alpha2_code)
# Register the function as a UDF
convert_alpha2_to_alpha3_udf = F.udf(convert_alpha2_to_alpha3)
countries = countries.withColumn('country_code_iso_alpha2', convert_fips_to_iso_udf(countries['code']))
countries = countries.withColumn('country_code_iso_alpha3', convert_alpha2_to_alpha3_udf(countries['country_code_iso_alpha2']))
show_as_html(countries, 10)
import seaborn as sns
def get_data_for_plot(element, average_data):
"""
Takes filtered temperature data and converts it into plottable
data for the specified element
"""
average_data = average_data.withColumn('year', F.year('date'))
average_data = average_data.groupBy('year').agg(
F.avg('average_temperature').alias(element),
F.count('average_temperature').alias('observation_count')
).orderBy(F.col("year"))
average_data = average_data.withColumn('date',
F.concat(F.col('year'),
F.lit('-'),
F.lit('01'),
F.lit('-'),
F.lit('01')))
#average_data_filtered = average_data.filter(average_data['observation_count'] >= 365)
#show_as_html(average_data, 100)
dates = average_data.select('date').toPandas()['date']
dates = pd.to_datetime(dates)
values = average_data.select(element).toPandas()[element]
values = values / 10
return dates, values
def initialise_plot(standard = True):
"""
Create a blank plot for later use
"""
if standard:
plt.figure(figsize=(10, 6))
else:
plt.figure(figsize=(20, 12))
plt.xlabel('Year')
plt.ylabel('Temperature (C)')
plt.grid(True)
plt.xticks(rotation=45)
return plt
def plot_temperature_data(elements, station_codes, daily, country_name="New Zealand"):
"""
Loop through elements and provide separate plots for minimum and maximum
temperatures by station if station_codes are provided, otherwise create
one plot containing the overall average maximum and minimum temperatures
"""
plots = []
plt = None
for element, friendly_name in elements.items():
# Filter data for the current element
filtered_data = daily.filter(nz_daily['element'] == element)
# If station codes list is empty, plot average for the country by grouping by date
if not station_codes:
if plt is None:
plt = initialise_plot()
# Group and calculate average temperature by date
average_data = filtered_data.groupBy('date').agg(F.avg('value').alias('average_temperature')).orderBy('date')
dates, values = get_data_for_plot(element, average_data)
plt.plot(dates, values, marker='o', linestyle='-')
plt.title(f'{country_name} Temperature over time')
else:
average_data = filtered_data.groupBy('id', 'date').agg(F.avg('value').alias('average_temperature'))
palette = sns.color_palette("husl", len(station_codes))
legend_entries = []
plt = initialise_plot(False)
for i, (station_id, name) in enumerate(station_codes.items()):
filtered_average_data = average_data.filter(average_data['id'] == station_id)
dates, values = get_data_for_plot(element, filtered_average_data)
#print(dates)
plt.plot(dates, values, marker='o', linestyle='-', color=palette[i])
legend_entries.append(plt.Line2D([], [], color=palette[i], label=name.strip()))
plt.title(f'{country_name} - Average {friendly_name} Temperature over time by Station')
plt.legend(handles=legend_entries)
plots.append(plt)
if not station_codes:
plots.append(plt)
return plots
elements = {'TMIN': 'Minimum', 'TMAX': 'Maximum'}
overall_average_plot = plot_temperature_data(elements, [], nz_daily)
overall_average_plot[0].show()
nz_stations = stations.filter(stations['station_country_code'] == 'NZ')
distinct_nz_stations = nz_stations.select('id', 'name').distinct()
nz_stations_dict = dict(distinct_nz_stations.rdd.map(lambda row: (row['id'], row['name'])).collect())
stations_average_plot = plot_temperature_data(elements, nz_stations_dict, nz_daily)
for plot in stations_average_plot:
plot.show()
#https://plotly.com/python/choropleth-maps/
confidence_level_cutoff = 0.05
# Find the records with the highest scaled_avg_rainfall
highest_scaled_rainfall_record = rainfall.orderBy(F.desc('scaled_avg_rainfall')).limit(10)
highest_scaled_rainfall_record_with_countries = highest_scaled_rainfall_record.join(countries, highest_scaled_rainfall_record['code'] == countries['code'], 'inner')
# Display the records
show_as_html(highest_scaled_rainfall_record_with_countries.orderBy(F.desc('scaled_avg_rainfall')))
# Find the records with the highest average_rainfall
highest_avg_rainfall_record = rainfall.orderBy(F.desc('average_rainfall')).limit(10)
highest_avg_rainfall_record_with_countries = highest_avg_rainfall_record.join(countries, highest_avg_rainfall_record['code'] == countries['code'], 'inner')
# Display the record
show_as_html(highest_avg_rainfall_record_with_countries.orderBy(F.desc('average_rainfall')))
rainfall_filtered = rainfall.filter((rainfall['year'] == '2023') & (rainfall['scaling_factor'] > confidence_level_cutoff))
rainfall_filtered = rainfall_filtered.withColumn('scaled_avg_rainfallmm', F.col('scaled_avg_rainfall') / 10)
rainfall_filtered = rainfall_filtered.withColumn('average_rainfallmm', F.col('average_rainfall') / 10)
joined_data = rainfall_filtered.join(countries, rainfall_filtered['code'] == countries['code'], 'inner')
# Create a new dataframe for records with scaling_factor <= 0.05
low_confidence = rainfall.filter((rainfall['year'] == '2023') & (rainfall['scaling_factor'] <= confidence_level_cutoff))
low_confidence_countries = low_confidence.join(countries, low_confidence['code'] == countries['code'], 'inner')
show_as_html(low_confidence_countries, 100)
rainfall_filtered_pd = joined_data.toPandas()
max_rainfall = rainfall_filtered_pd['average_rainfallmm'].max()
fig = go.Figure(data=go.Choropleth(
locations = rainfall_filtered_pd['country_code_iso_alpha3'],
z = rainfall_filtered_pd['average_rainfallmm'],
text = rainfall_filtered_pd['name'],
colorscale = 'Blues',
hovertemplate='%{text}: %{z:.2f} mm', # Format hover text with 2 decimal places and "mm"
autocolorscale=False,
reversescale=False,
marker_line_color='darkgray',
marker_line_width=0.5,
colorbar_title = 'Annual Rainfall (mm)',
zmin=0,
zmax=max_rainfall,
))
fig.update_layout(
title_text='2023 Annual Rainfall',
geo=dict(
showframe=False,
showcoastlines=True, # Show coastlines
projection_type='equirectangular'
),
annotations = [dict(
x=0.55,
y=0.1,
xref='paper',
yref='paper',
text='Source: <a href="https://www.ncei.noaa.gov/products/land-based-station/global-historical-climatology-network-daily">\
GHCN Daily Data</a>',
showarrow = False
)],
coloraxis_colorbar=dict(
title='Annual Rainfall (mm)', # Colorbar title
lenmode='pixels',
len=400, # Adjust colorbar length
)
)
fig.show()
os.system('jupyter nbconvert --to html_embed Plots.ipynb')
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI
stop_spark()